摘要
這篇文章探討了如何將 LangGraph 的強大功能與 MongoDB 的資料儲存和檢索能力相結合,打造一個智慧的工地安全監控系統。文章首先介紹了 LangGraph 的 Checkpointer 機制,以及使用記憶體儲存方式的局限性。接著,文章重點介紹了如何使用 MongoDB 作為 Checkpointer 來解決這些問題,並展示了如何在 MongoDB 中儲存和管理工地安全事件資料。
文章的核心概念是 Agentic RAG,一種結合大語言模型和資訊檢索技術的人工智慧開發範式。透過使用 OpenAI 的嵌入模型將文字資料轉換為向量表示,並利用 MongoDB 的 $vectorSearch 運算子進行向量搜尋,系統可以理解使用者自然語言查詢,快速找到語意相關的安全事件記錄,並提供詳細的資訊。
文章最後展示了如何將 LangGraph 的工作流與 MongoDB 整合,讓系統能夠即時分析安全事件資料,持久化儲存對話歷史和系統狀態,並利用向量搜尋快速檢索相關資訊。透過這個整合,文章打造了一個智慧而高效的工地安全監控系統,並強調這種技術組合可以應用於許多不同的領域。
在上一篇文章中,我們深入探討了 LangGraph 中的 Checkpointer 機制,特別關注了如何使用 MemorySaver 來實現基本的對話歷史管理。這種方法為我們提供了一個簡單而有效的方式來在應用程式運行期間保存和檢索狀態資訊。
然而,隨著應用規模的擴大和需要處理的資料量的增加,記憶體存儲方式可能會面臨一些挑戰,比如:
為了解決這些問題,我們需要探索更強大、更靈活的儲存解決方案。這就是為什麼在本篇文章中,我們將把注意力轉向外部資料庫,特別是 MongoDB。在接下來的內容中,我們將透過一個實際的案例 —— 工地安全監控系統 —— 來演示如何在 LangGraph 應用中整合和使用 MongoDB。
想像一個場景:在一個繁忙的建築工地上,數百名工人、數十台重型機械同時運作。如何確保每個人的安全?如何及時發現並預防潛在的危險?這就是我們的智慧監控系統要解決的核心問題。
本系統的主要功能包括:
1.資料儲存與管理:利用 MongoDB 高效儲存和管理大量複雜的安全事件資料。
2.智慧分析:運用 LangGraph 框架進行自然語言處理和智慧決策。
3.互動式查詢:允許安全管理人員透過自然語言對話方式查詢和分析安全資訊。
Agentic RAG 是人工智慧應用開發的一個新興範式,它結合了大語言模型(LLM)的強大能力和資訊檢索技術。在我們的工地安全場景中,這意味著系統可以:
要理解 Agentic RAG,我們需要了解幾個關鍵概念:
假設情境如下:
位於台北市的「新北科技園區」正在進行一項大型建設專案,預計在三年內完成一座現代化的科技園區。為了確保工程進行期間的安全,專案管理團隊引入了一套先進的安全監控系統。 這套系統整合了多種技術,包括:
分佈在工地各處的高清攝影機 智能識別軟體 即時通報機制
系統的主要目標是預防事故發生,並在發生異常情況時能夠迅速反應。
為了模擬真實的工地安全監控場景,我們創建了一個包含多個安全事件的資料集。這些事件涵蓋了各種安全問題,如設備故障、人員違規和環境危害等。
##數據欄位說明
範例如下
事件ID: INC0001
日期時間: 2024-02-24 15:32:35
事件類型: 火災
嚴重程度: 危急
地點: 機械操作區-3區
涉及人員: 曹彥廷
人員角色: 工程師
攝影機ID: CAM009
攝影機類型: 熱感應攝影機
異常事件類型: 違規操作
處理人員: 曹彥廷
影片來源: /video_storage/2024/02/24/CAM009_20240224_153235.mp4
解讀:
2024年2月24日下午3點32分,在機械操作區-3區發生了一起嚴重的火災事件。涉及人員是工程師曹彥廷,可能由於違規操作引發。事件被 CAM009 號熱感應攝影機記錄,處理人員同樣是曹彥廷。相關的視頻證據存儲在指定的位置。
聲明:模擬工地現場存在 Schema,全程使用 Faker 套件製作,如果人名、地點、時間雷同,純屬巧合。
在我們的 RAG 系統中,MongoDB 扮演著雙重角色:既是傳統的操作型資料庫,又是高效的向量資料庫。特別是 MongoDB Atlas,它為我們提供了一個強大的解決方案,能夠高效地儲存、查詢和檢索向量嵌入。
讓我們一步步來設定我們的 MongoDB Atlas 環境:
Caption: 成功建置 DB 逅畫面
在 Atlas 中,我們需要創建用於儲存安全事件資料的資料庫和集合:
construction_safety_ai_use_case
的資料庫。safety_reports
的集合。為了支援高效的向量搜尋,我們需要為 safety_reports
集合創建一個特殊的索引:
safety_reports
集合創建一個向量搜尋索引,命名為 safety_reports_vector_index
。{
"fields": [
{
"numDimensions": 256,
"path": "embedding",
"similarity": "cosine",
"type": "vector"
}
]
}
這個索引將使我們的 RAG 應用能夠透過向量搜尋快速檢索相關記錄,為使用者查詢提供額外的上下文資訊。
取得 MongoDB Atlas 叢集的連線 URI 後,我們可以使用 PyMongo 函式庫輕鬆建立連線:
import os
from pymongo import MongoClient
MONGO_URI = os.environ.get("MONGO_URI")
client = MongoClient(MONGO_URI)
db = client.get_database("construction_safety_ai_use_case")
collection = db.get_collection("safety_reports")
注意事項:
移除任何集合,通常需要安置這個,可能會建立許多次
# Delete any existing records in the collection
collection.delete_many({})
將合併後 dataframe 上傳
documents = [row.to_dict() for _, row in df.iterrows()]
collection.insert_many(documents)
print("Data ingestion into MongoDB completed")
成功後你會在 MongoDB 獲得類似以下畫面
Caption: 已有上傳資料在 MongoDB 截圖
透過以上步驟,我們就成功設定了 MongoDB 向量資料庫,為後續的 RAG 系統開發奠定了堅實的基礎。這個設定不僅支援傳統的資料操作,還能高效處理向量搜尋,這對於我們的工地安全監控系統至關重要。
如果在操作過程中遇到任何問題,可以參考 MongoDB 的官方文件或尋求技術支援。接下來,我們將探討如何利用這個強大的資料庫設定來實現我們的智慧安全監控系統。
在我們的工地安全監控系統中,向量檢索功能是實現智慧查詢和分析的關鍵。這個功能允許我們基於語意相似性快速找到相關的安全事件記錄,為使用者提供精確而相關的資訊。讓我們逐步實現這個功能:
首先,我們需要將文字資料轉換為向量表示。這裡我們使用 OpenAI 的嵌入模型:首先,我们需要将文本数据转换为向量表示。这里我们使用 OpenAI 的嵌入模型:
from langchain_openai import OpenAIEmbeddings
OPEN_AI_EMBEDDING_MODEL = "text-embedding-3-small"
OPEN_AI_EMBEDDING_MODEL_DIMENSION = 256
embedding_model = OpenAIEmbeddings(model=OPEN_AI_EMBEDDING_MODEL, dimensions=OPEN_AI_EMBEDDING_MODEL_DIMENSION)
def get_embedding(text):
return embedding_model.embed_query(text=text)
接下來,我們實現一個函數來執行向量搜尋:
def vector_search(user_query, collection):
query_embedding = get_embedding(user_query)
pipeline = [
{
"$vectorSearch": {
"index": "safety_reports_vector_index",
"queryVector": query_embedding,
"path": "embedding",
"numCandidates": 100,
"limit": 5
}
},
{
"$unset": "embedding"
},
{
"$project": {
"_id": 0,
"事件ID": 1,
"日期時間": 1,
"違規事件類型": 1,
"地點": 1,
"違規人員": 1,
"違反職安條款": 1,
"score": {"$meta": "vectorSearchScore"}
}
}
]
return list(collection.aggregate(pipeline))
這個函數執行以下操作:
為了更好地展示搜尋結果,我們可以實現一個格式化函數:
def format_search_results(results):
formatted_results = []
for result in results:
formatted_result = (
f"事件ID: {result['事件ID']}\n"
f"日期時間: {result['日期時間']}\n"
f"違規事件類型: {result['違規事件類型']}\n"
f"地點: {result['地點']}\n"
f"違規人員: {result['違規人員']}\n"
f"違反職安條款: {result['違反職安條款']}\n"
f"相似度分數: {result['score']:.4f}\n"
)
formatted_results.append(formatted_result)
return "\n".join(formatted_results)
最後,我們可以測試我們的向量檢索功能:
def test_vector_search(query):
results = vector_search(query, collection)
print(f"查詢: {query}\n")
print(format_search_results(results))
# 測試查詢
test_queries = [
"辦公區最近有什麼異常情況嗎?",
"停車場有沒有什麼安全問題?",
"有人在工作時沒戴安全裝備嗎?",
"材料堆放區有什麼需要注意的嗎?",
]
for query in test_queries:
test_vector_search(query)
print("\n" + "="*50 + "\n")
這個測試函數允許我們輸入不同的查詢,並查看系統如何檢索相關的安全事件記錄。
透過實現這個向量檢索功能,我們的工地安全監控系統現在能夠:
這個功能為我們的系統增加了強大的語意理解能力,使其不僅能夠基於關鍵字匹配來查找資訊,還能理解查詢的上下文和意圖,從而提供更加精準和有價值的結果。在下一節中,我們將探討如何將這個功能整合到我們的 LangGraph 工作流中,進一步增強系統的智慧性。
目前為止完成基礎的工作,整理資料及、建立向量資訊、存放資料庫、驗證檢索是否成功,接下來才要跟語言模型有關,在這之前都是 AI 沒啥關聯的XD
在這一節中,我們將探討如何將 LangGraph 的強大功能與 MongoDB 的資料儲存和檢索能力相結合,創建一個智慧的工地安全監控系統。
LangGraph 的一個關鍵特性是能夠保存和恢復狀態。為了利用 MongoDB 的持久化儲存能力,我們實現了一個自定義的 MongoDBSaver
類別:
import pickle
from contextlib import AbstractContextManager
from types import TracebackType
from typing import Any, Dict, Optional, AsyncIterator, Union, List, Tuple
from langchain_core.runnables import RunnableConfig
from typing_extensions import Self
from langgraph.checkpoint.base import (
BaseCheckpointSaver,
Checkpoint,
CheckpointMetadata,
CheckpointTuple,
SerializerProtocol,
)
from langgraph.checkpoint.serde.jsonplus import JsonPlusSerializer
from motor.motor_asyncio import AsyncIOMotorClient
from datetime import datetime, timezone
class JsonPlusSerializerCompat(JsonPlusSerializer):
def loads(self, data: bytes) -> Any:
if data.startswith(b"\x80") and data.endswith(b"."):
return pickle.loads(data)
return super().loads(data)
class MongoDBSaver(AbstractContextManager, BaseCheckpointSaver):
serde = JsonPlusSerializerCompat()
client: AsyncIOMotorClient
db_name: str
collection_name: str
def __init__(
self,
client: AsyncIOMotorClient,
db_name: str,
collection_name: str,
*,
serde: Optional[SerializerProtocol] = None,
) -> None:
super().__init__(serde=serde)
self.client = client
self.db_name = db_name
self.collection_name = collection_name
self.collection = client[db_name][collection_name]
# ... [其他方法實現]
async def aput(
self,
config: RunnableConfig,
checkpoint: Checkpoint,
metadata: CheckpointMetadata,
new_versions: Optional[dict[str, Union[str, float, int]]]
) -> RunnableConfig:
doc = {
"thread_id": config["configurable"]["thread_id"],
"thread_ts": checkpoint["id"],
"checkpoint": self.serde.dumps(checkpoint),
"metadata": self.serde.dumps(metadata),
}
if config["configurable"].get("thread_ts"):
doc["parent_ts"] = config["configurable"]["thread_ts"]
await self.collection.insert_one(doc)
return {
"configurable": {
"thread_id": config["configurable"]["thread_id"],
"thread_ts": checkpoint["id"],
}
}
這個 MongoDBSaver
類別允許 LangGraph 將狀態資訊儲存在 MongoDB 中,實現了跨會話的持久化和狀態恢復。
補充說明
在官方文件中有分成 sync 以及 async 的實作方式,可以參考這個連結,後來找到網友討論說 MongoDB 官方也有自己實現一版,可以參考這個 Repo,MongoDB 官方部落格也會不定時更新相關文章,強烈建議大家 Follow 起來。
為了使我們的系統能夠執行各種任務,我們定義了一系列工具和代理:
@tool
def list_safety_incidents(limit: int = 10, skip: int = 0, sort_by: str = "日期時間", sort_order: int = -1) -> str:
# ... [實現程式碼]
@tool
def search_safety_incident(incident_id: str) -> str:
# ... [實現程式碼]
@tool
def search_safety_incidents_by_type(event_type: str, limit: int = 5) -> str:
# ... [實現程式碼]
safety_tools = [
list_safety_incidents,
search_safety_incident,
search_safety_incidents_by_type,
]
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(temperature=0)
def create_agent(llm, tools, system_message: str):
# ... [實現程式碼]
safety_management_agent = create_agent(
llm,
safety_tools,
system_message="""
你是一個先進的安全事件管理分析助理(SEMA),專門負責工作場所的安全事件分析和管理。你的主要責任包括:
1. 安全事件分析
2. 支援安全決策
3. 產生報告和分析
...
"""
)
這些工具和代理使我們的系統能夠執行複雜的安全分析任務。
最後,我們使用 LangGraph 構建一個工作流,將所有組件整合在一起:
from langgraph.graph import END, StateGraph
from langgraph.prebuilt import tools_condition
workflow = StateGraph(AgentState)
workflow.add_node("chatbot", chatbot_node)
workflow.add_node("tools", tool_node)
workflow.set_entry_point("chatbot")
workflow.add_conditional_edges(
"chatbot",
tools_condition,
{"tools": "tools", END: END}
)
workflow.add_edge("tools", "chatbot")
mongo_client = AsyncIOMotorClient(MONGO_URI)
mongodb_checkpointer = MongoDBSaver(mongo_client, DB_NAME, "state_store")
graph = workflow.compile(checkpointer=mongodb_checkpointer)
這個工作流定義了系統的行為邏輯,包括如何處理使用者輸入、何時使用工具,以及如何產生回應。
為了與使用者互動,我們實現了一個非同步的對話循環:
import asyncio
from langchain_core.messages import HumanMessage, AIMessage
import time
async def chat_loop():
config = {"configurable": {"thread_id": "10"}}
print("可以輸入 'quit', 'exit', 'q' 來結束對話")
while True:
user_input = await asyncio.get_event_loop().run_in_executor(None, input, "使用者: ")
if user_input.lower() in ["quit", "exit", "q"]:
print("再見!")
break
sanitized_name = sanitize_name("Human") or "匿名"
state = {"messages": [HumanMessage(content=user_input, name=sanitized_name)]}
print("助理: ", end="", flush=True)
max_retries = 3
retry_delay = 1
for attempt in range(max_retries):
try:
async for chunk in graph.astream(state, config, stream_mode="values"):
# ... [處理回應]
break
except Exception as e:
# ... [錯誤處理]
print("\n")
await chat_loop()
透過這種整合,我們創建了一個強大的工地安全監控系統,它能夠:
在這篇文章中,我們深入探討了如何在 LangGraph 中使用 MongoDB 作為 Checkpointer。我們選擇了工地安全監控系統作為例子,不僅因為它很實用,更是為了讓大家更容易理解這些技術概念。
透過這個例子,我們看到了 MongoDB 作為 Checkpointer 的強大之處。它不僅能夠保存對話的狀態,還能在需要時輕鬆恢復,這對於構建複雜的對話系統來說真的很重要。我們還看到了如何將 LangGraph 和 MongoDB 結合起來,創造出一個既智慧又高效的系統。
雖然我們用的是工地安全這個例子,但其實這種技術組合可以用在很多不同的領域。我們希望透過這篇文章,能給大家一些啟發,看看如何在自己的專案中運用這些技術。無論你是在開發什麼樣的系統,只要涉及到複雜的對話和資料管理,這種方法都可能對你有所幫助。
總的來說,我們希望這篇文章不僅讓你了解了如何使用 MongoDB 作為 Checkpointer,還能激發你在自己的專案中嘗試類似的技術組合。
即刻前往教學程式碼 Repo,親自動手實作工安監控系統吧!別忘了給專案按個星星並持續關注更新,讓我們一起探索AI代理的新境界。
碎碎念時間:這邊實作弄好之後,MongoDB 也剛好弄了一篇工地安全助理的教學程式碼,資料集不同之外,邏輯大致上相同,也可以看看這份檔案
X. 參考資料